Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

try opening with contextmanager #370

Merged
merged 4 commits into from
Jun 1, 2022

Conversation

rabernat
Copy link
Contributor

This is one possible workaround for fsspec/filesystem_spec#579

This fails with the following error

apache_beam/coders/coder_impl.py:268: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

x = <xarray.Dataset>
Dimensions:  (time: 1, lat: 18, lon: 36)
Coordinates:
  * time     (time) datetime64[ns] 2010-01-01
 ...
    bar      (time, lat, lon) int64 ...
    foo      (time, lat, lon) float64 ...
Attributes:
    conventions:  CF 1.6

>   lambda x: dumps(x, protocol), pickle.loads)

../../anaconda/envs/pangeo-forge-recipes/lib/python3.8/site-packages/apache_beam/coders/coders.py:802: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <File-like object HTTPFileSystem, http://127.0.0.1:53014/000.nc>

    def __reduce__(self):
        return (
            reopen,
            (
                self.fs,
                self.url,
                self.mode,
                self.blocksize,
>               self.cache.name,
                self.size,
            ),
        )
E       AttributeError: 'NoneType' object has no attribute 'name'

../../anaconda/envs/pangeo-forge-recipes/lib/python3.8/site-packages/fsspec/implementations/http.py:637: AttributeError

During handling of the above exception, another exception occurred:

pcoll_xarray_datasets = <_ChainedPTransform(PTransform) label=[Create|OpenWithFSSpec|OpenWithXarray] at 0x7ff11a6e23d0>

    def test_OpenWithXarray(pcoll_xarray_datasets):
        with TestPipeline() as p:
            output = p | pcoll_xarray_datasets
    
>           assert_that(output, is_xr_dataset(), label="is xr.Dataset")

tests/test_beam.py:93: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../anaconda/envs/pangeo-forge-recipes/lib/python3.8/site-packages/apache_beam/pipeline.py:596: in __exit__
    self.result = self.run()
../../anaconda/envs/pangeo-forge-recipes/lib/python3.8/site-packages/apache_beam/testing/test_pipeline.py:112: in run
    result = super().run(
../../anaconda/envs/pangeo-forge-recipes/lib/python3.8/site-packages/apache_beam/pipeline.py:546: in run
    return Pipeline.from_runner_api(
../../anaconda/envs/pangeo-forge-recipes/lib/python3.8/site-packages/apache_beam/pipeline.py:573: in run
    return self.runner.run_pipeline(self, self._options)
../../anaconda/envs/pangeo-forge-recipes/lib/python3.8/site-packages/apache_beam/runners/direct/direct_runner.py:131: in run_pipeline
    return runner.run_pipeline(pipeline, options)
../../anaconda/envs/pangeo-forge-recipes/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:199: in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
../../anaconda/envs/pangeo-forge-recipes/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:210: in run_via_runner_api
    return self.run_stages(stage_context, stages)
../../anaconda/envs/pangeo-forge-recipes/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:392: in run_stages
    stage_results = self._run_stage(
../../anaconda/envs/pangeo-forge-recipes/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:657: in _run_stage
    self._run_bundle(
../../anaconda/envs/pangeo-forge-recipes/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:780: in _run_bundle
    result, splits = bundle_manager.process_bundle(
../../anaconda/envs/pangeo-forge-recipes/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py:1091: in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
../../anaconda/envs/pangeo-forge-recipes/lib/python3.8/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py:378: in push
    response = self.worker.do_instruction(request)
../../anaconda/envs/pangeo-forge-recipes/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:580: in do_instruction
    return getattr(self, request_type)(
../../anaconda/envs/pangeo-forge-recipes/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py:618: in process_bundle
    bundle_processor.process_bundle(instruction_id))
../../anaconda/envs/pangeo-forge-recipes/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py:995: in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
../../anaconda/envs/pangeo-forge-recipes/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py:221: in process_encoded
    self.output(decoded_value)
apache_beam/runners/worker/operations.py:346: in apache_beam.runners.worker.operations.Operation.output
    ???
apache_beam/runners/worker/operations.py:348: in apache_beam.runners.worker.operations.Operation.output
    ???
apache_beam/runners/worker/operations.py:215: in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    ???
apache_beam/runners/worker/operations.py:707: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/worker/operations.py:708: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/common.py:1200: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:1265: in apache_beam.runners.common.DoFnRunner._reraise_augmented
    ???
apache_beam/runners/common.py:1198: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:536: in apache_beam.runners.common.SimpleInvoker.invoke_process
    ???
apache_beam/runners/common.py:1361: in apache_beam.runners.common._OutputProcessor.process_outputs
    ???
apache_beam/runners/worker/operations.py:215: in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    ???
apache_beam/runners/worker/operations.py:707: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/worker/operations.py:708: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/common.py:1200: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:1265: in apache_beam.runners.common.DoFnRunner._reraise_augmented
    ???
apache_beam/runners/common.py:1198: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:536: in apache_beam.runners.common.SimpleInvoker.invoke_process
    ???
apache_beam/runners/common.py:1361: in apache_beam.runners.common._OutputProcessor.process_outputs
    ???
apache_beam/runners/worker/operations.py:215: in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    ???
apache_beam/runners/worker/operations.py:707: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/worker/operations.py:708: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/common.py:1200: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:1265: in apache_beam.runners.common.DoFnRunner._reraise_augmented
    ???
apache_beam/runners/common.py:1198: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:536: in apache_beam.runners.common.SimpleInvoker.invoke_process
    ???
apache_beam/runners/common.py:1361: in apache_beam.runners.common._OutputProcessor.process_outputs
    ???
apache_beam/runners/worker/operations.py:215: in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    ???
apache_beam/runners/worker/operations.py:707: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/worker/operations.py:708: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/common.py:1200: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:1265: in apache_beam.runners.common.DoFnRunner._reraise_augmented
    ???
apache_beam/runners/common.py:1198: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:536: in apache_beam.runners.common.SimpleInvoker.invoke_process
    ???
apache_beam/runners/common.py:1361: in apache_beam.runners.common._OutputProcessor.process_outputs
    ???
apache_beam/runners/worker/operations.py:215: in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    ???
apache_beam/runners/worker/operations.py:707: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/worker/operations.py:708: in apache_beam.runners.worker.operations.DoOperation.process
    ???
apache_beam/runners/common.py:1200: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:1281: in apache_beam.runners.common.DoFnRunner._reraise_augmented
    ???
apache_beam/runners/common.py:1198: in apache_beam.runners.common.DoFnRunner.process
    ???
apache_beam/runners/common.py:536: in apache_beam.runners.common.SimpleInvoker.invoke_process
    ???
apache_beam/runners/common.py:1361: in apache_beam.runners.common._OutputProcessor.process_outputs
    ???
apache_beam/runners/worker/operations.py:214: in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
    ???
apache_beam/runners/worker/operations.py:178: in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
    ???
apache_beam/runners/worker/opcounters.py:211: in apache_beam.runners.worker.opcounters.OperationCounters.update_from
    ???
apache_beam/runners/worker/opcounters.py:250: in apache_beam.runners.worker.opcounters.OperationCounters.do_sample
    ???
apache_beam/coders/coder_impl.py:1425: in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
    ???
apache_beam/coders/coder_impl.py:1436: in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
    ???
apache_beam/coders/coder_impl.py:377: in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.get_estimated_size_and_observables
    ???
apache_beam/coders/coder_impl.py:416: in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
    ???
apache_beam/coders/coder_impl.py:441: in apache_beam.coders.coder_impl.FastPrimitivesCoderImpl.encode_to_stream
    ???
apache_beam/coders/coder_impl.py:268: in apache_beam.coders.coder_impl.CallbackCoderImpl.encode_to_stream
    ???
../../anaconda/envs/pangeo-forge-recipes/lib/python3.8/site-packages/apache_beam/coders/coders.py:802: in <lambda>
    lambda x: dumps(x, protocol), pickle.loads)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <File-like object HTTPFileSystem, http://127.0.0.1:53014/000.nc>

    def __reduce__(self):
        return (
            reopen,
            (
                self.fs,
                self.url,
                self.mode,
                self.blocksize,
>               self.cache.name,
                self.size,
            ),
        )
E       AttributeError: 'NoneType' object has no attribute 'name' [while running 'Create|OpenWithFSSpec|OpenWithXarray/OpenWithXarray/Open with Xarray']

../../anaconda/envs/pangeo-forge-recipes/lib/python3.8/site-packages/fsspec/implementations/http.py:637: AttributeError

@martindurant
Copy link
Contributor

martindurant commented May 27, 2022

--- a/fsspec/implementations/http.py
+++ b/fsspec/implementations/http.py
@@ -642,7 +642,7 @@ class HTTPFile(AbstractBufferedFile):
                 self.url,
                 self.mode,
                 self.blocksize,
-                self.cache.name,
+                self.cache.name if self.cache else "none",
                 self.size,
             ),
         )

?

-edited-

@rabernat
Copy link
Contributor Author

This PR works with fsspec/filesystem_spec#973! 🎉

Thanks for the suggestion Martin!

Comment on lines 54 to 58
try:
ds = xr.open_dataset(open_file, **self.xarray_open_kwargs)
except ValueError:
with open_file as fp:
ds = xr.open_dataset(fp, **self.xarray_open_kwargs)
Copy link
Contributor Author

@rabernat rabernat May 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal is to remove the try / except block and just have consistent behavior with all open files. But If I try to just always use the context manager, like this,

Suggested change
try:
ds = xr.open_dataset(open_file, **self.xarray_open_kwargs)
except ValueError:
with open_file as fp:
ds = xr.open_dataset(fp, **self.xarray_open_kwargs)
with open_file as fp:
ds = xr.open_dataset(fp, **self.xarray_open_kwargs)

I get the error

E   TypeError: cannot pickle '_io.BufferedReader' object
[while running 'Create|OpenWithFSSpec|OpenWithXarray/OpenWithXarray/Open with Xarray']

This only happens when opening from the cache. In that case, we are not using fsspec.open but rather using the filesystem open method that is attached to the cache object, i.e.

def open_file(self, path: str, **kwargs) -> OpenFileType:
"""Returns an fsspec open file"""
full_path = self._full_path(path)
logger.debug(f"returning open file for {full_path}")
return self.fs.open(full_path, **kwargs)

So we need to try to reconcile the behavior of get_opener with cache.open_file, such that they return the same type of open file object.

@alxmrs
Copy link
Contributor

alxmrs commented May 31, 2022

I've read some of the discussion around this issue, and I'd like to offer a few pointers.

To achieve the goals described here, consider using Apache Beam's Filesystems API. It requires no extra dependency besides Beam.

I've used it to solve a similar problem here: https://github.com/google/xarray-beam/blob/main/xarray_beam/_src/pangeo_forge.py#L115

@rabernat
Copy link
Contributor Author

rabernat commented Jun 1, 2022

Alex, I appreciate the suggestion to look into the beam filesystems. I just read up on them and they look nice. However, you must understand that fsspec is too tightly intertwined into our stack to abandon at this stage. Xarray and Zarr both have very helpful integrations with and and optimizations for fsspec filesystems.

This PR seems to work now with fsspec/filesystem_spec#973. Surfacing issues and making upstream improvements for fsspec is an important secondary goal for our project, and that's what we will continue to do for now.

@rabernat
Copy link
Contributor Author

rabernat commented Jun 1, 2022

As predicted, the upstream-dev tests pass! 🚀

I'm going to merge this for now. @martindurant would be goo to get an fsspec soon 🙏 but we can move forward with the upstream dev environment.

@rabernat rabernat marked this pull request as ready for review June 1, 2022 21:33
@rabernat rabernat merged commit d933e57 into pangeo-forge:beam-refactor Jun 1, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants